import { Channel, ConsumeMessage } from 'amqplib';
import { EventEmitter } from 'events';
import { ConnectionPool } from './connection-pool';
import { Message } from './message';
import { MessageHandler, MessageSerializer, ConsumerOptions, Logger, MessageError, isMessageData, ManagedConnection } from '@/types';
import { JSONSerializer } from './serializers';
import { createLogger } from '@/utils/logger';

/**
 * Base consumer class for consuming messages
 */
export class Consumer extends EventEmitter {
  protected readonly connectionPool: ConnectionPool;
  protected readonly handler: MessageHandler;
  protected readonly serializer: MessageSerializer;
  protected readonly logger: Logger;
  protected channel?: Channel;
  protected connection?: ManagedConnection;
  protected consumerTag?: string;
  protected isConsuming = false;

  constructor(
    connectionPool: ConnectionPool,
    handler: MessageHandler,
    serializer: MessageSerializer = new JSONSerializer(),
    logger?: Logger
  ) {
    super();
    this.connectionPool = connectionPool;
    this.handler = handler;
    this.serializer = serializer;
    this.logger = logger ?? createLogger('Consumer');
  }

  /**
   * Initialize the consumer
   */
  async initialize(): Promise<void> {
    try {
      this.connection = await this.connectionPool.getConnection();
      this.channel = this.connection.channel;

      this.logger.info('Consumer initialized');
      this.emit('initialized');
    } catch (error) {
      this.logger.error('Failed to initialize consumer', error as Error);
      throw new MessageError(`Consumer initialization failed: ${error}`);
    }
  }

  /**
   * Start consuming messages
   */
  async startConsuming(options: ConsumerOptions): Promise<void> {
    if (!this.channel) {
      throw new MessageError('Consumer not initialized. Call initialize() first.');
    }

    if (this.isConsuming) {
      throw new MessageError('Consumer is already consuming');
    }

    try {
      // Set QoS if prefetchCount is specified
      if (options.prefetchCount !== undefined) {
        await this.channel!.prefetch(options.prefetchCount);
      }

      // Start consuming
      const consumeResult = await this.channel!.consume(
        options.queueName,
        msg => this.handleMessage(msg, options),
        {
          noAck: options.autoAck ?? false,
          exclusive: options.exclusive ?? false,
          consumerTag: options.consumerTag,
          arguments: options.arguments,
        }
      );

      this.consumerTag = consumeResult.consumerTag;
      this.isConsuming = true;

      this.logger.info(
        `Started consuming from queue ${options.queueName} with tag ${this.consumerTag}`
      );
      this.emit('started', options);
    } catch (error) {
      this.logger.error(
        `Failed to start consuming from queue ${options.queueName}`,
        error as Error
      );
      throw new MessageError(`Failed to start consuming: ${error}`);
    }
  }

  /**
   * Stop consuming messages
   */
  async stopConsuming(): Promise<void> {
    if (!this.isConsuming || !this.consumerTag || !this.channel) {
      return;
    }

    try {
      await this.channel.cancel(this.consumerTag);
      this.isConsuming = false;
      this.consumerTag = undefined;

      this.logger.info('Stopped consuming');
      this.emit('stopped');
    } catch (error) {
      this.logger.error('Error stopping consumer', error as Error);
      throw new MessageError(`Failed to stop consuming: ${error}`);
    }
  }

  /**
   * Handle incoming message
   */
  protected async handleMessage(
    msg: ConsumeMessage | null,
    options: ConsumerOptions
  ): Promise<void> {
    if (!msg) {
      this.logger.warn('Received null message');
      return;
    }

    let message: Message;

    try {
      // Deserialize message
      const messageData = this.serializer.deserialize(msg.content.toString());
      if (!isMessageData(messageData)) {
        throw new Error('Invalid message data format');
      }
      message = Message.fromObject(messageData);

      this.logger.debug(`Received message ${message.id}`);
      this.emit('messageReceived', message);
    } catch (error) {
      this.logger.error('Failed to deserialize message', error as Error);

      if (!options.autoAck) {
        this.channel!.nack(msg, false, false); // Reject and don't requeue
      }

      this.emit('messageError', null, error);
      return;
    }

    try {
      // Process message
      const result = await this.handler.handle(message);

      if (result.success) {
        this.logger.debug(`Successfully processed message ${message.id}`);

        if (!options.autoAck) {
          this.channel!.ack(msg);
        }

        this.emit('messageProcessed', message, result);
      } else {
        this.logger.warn(`Failed to process message ${message.id}: ${result.message}`);

        if (!options.autoAck) {
          this.channel!.nack(msg, false, result.shouldRetry);
        }

        this.emit('messageProcessingFailed', message, result);
      }
    } catch (error) {
      this.logger.error(`Error processing message ${message.id}`, error as Error);

      try {
        const errorResult = await this.handler.onError?.(message, error as Error);
        const shouldRetry =
          errorResult?.shouldRetry ?? this.handler.shouldRetry?.(message, error as Error) ?? false;

        if (!options.autoAck) {
          this.channel!.nack(msg, false, shouldRetry);
        }

        this.emit('messageError', message, error);
      } catch (handlerError) {
        this.logger.error(
          `Error in error handler for message ${message.id}`,
          handlerError as Error
        );

        if (!options.autoAck) {
          this.channel!.nack(msg, false, false); // Don't retry if error handler fails
        }
      }
    }
  }

  /**
   * Close the consumer
   */
  async close(): Promise<void> {
    if (this.isConsuming) {
      await this.stopConsuming();
    }

    if (this.channel) {
      try {
        await this.channel.close();
        this.logger.info('Consumer closed');
        this.emit('closed');
      } catch (error) {
        this.logger.error('Error closing consumer', error as Error);
      } finally {
        this.channel = undefined;
      }
    }

    // 返回连接到连接池
    if (this.connection) {
      try {
        await this.connectionPool.returnConnection(this.connection);
        this.logger.debug('Connection returned to pool');
      } catch (error) {
        this.logger.error('Error returning connection to pool', error as Error);
      } finally {
        this.connection = undefined;
      }
    }
  }

  /**
   * Get consumer status
   */
  getStatus(): {
    isConsuming: boolean;
    consumerTag?: string;
  } {
    return {
      isConsuming: this.isConsuming,
      consumerTag: this.consumerTag || undefined,
    };
  }
}

/**
 * Concurrent consumer that processes messages in parallel
 */
export class ConcurrentConsumer extends Consumer {
  private readonly concurrency: number;
  private activeProcessing = 0;
  private readonly processingQueue: Array<() => Promise<void>> = [];

  constructor(
    connectionPool: ConnectionPool,
    handler: MessageHandler,
    concurrency = 5,
    serializer: MessageSerializer = new JSONSerializer(),
    logger?: Logger
  ) {
    super(connectionPool, handler, serializer, logger);
    this.concurrency = concurrency;
  }

  /**
   * Start consuming with concurrency control
   */
  async startConsuming(options: ConsumerOptions): Promise<void> {
    // Set prefetch to concurrency level if not specified
    if (options.prefetchCount === undefined) {
      options.prefetchCount = this.concurrency;
    }

    await super.startConsuming(options);
  }

  /**
   * Handle message with concurrency control
   */
  protected async handleMessage(
    msg: ConsumeMessage | null,
    options: ConsumerOptions
  ): Promise<void> {
    if (!msg) {
      return;
    }

    // If we're at max concurrency, queue the message processing
    if (this.activeProcessing >= this.concurrency) {
      this.processingQueue.push(() => this.processMessage(msg, options));
      return;
    }

    await this.processMessage(msg, options);
  }

  private async processMessage(msg: ConsumeMessage, options: ConsumerOptions): Promise<void> {
    this.activeProcessing++;

    try {
      await super['handleMessage'](msg, options);
    } finally {
      this.activeProcessing--;

      // Process next queued message if any
      const nextProcessor = this.processingQueue.shift();
      if (nextProcessor) {
        nextProcessor().catch(error => {
          this.logger.error('Error processing queued message', error);
        });
      }
    }
  }
}

/**
 * Batch consumer that collects messages before processing
 */
export class BatchConsumer extends Consumer {
  private readonly batchSize: number;
  private readonly batchTimeout: number;
  private readonly batch: Array<{ message: Message; msg: ConsumeMessage }> = [];
  private batchTimer?: NodeJS.Timeout;

  constructor(
    connectionPool: ConnectionPool,
    handler: MessageHandler,
    batchSize = 10,
    batchTimeout = 5000,
    serializer: MessageSerializer = new JSONSerializer(),
    logger?: Logger
  ) {
    super(connectionPool, handler, serializer, logger);
    this.batchSize = batchSize;
    this.batchTimeout = batchTimeout;
  }

  /**
   * Handle message by adding to batch
   */
  protected async handleMessage(
    msg: ConsumeMessage | null,
    options: ConsumerOptions
  ): Promise<void> {
    if (!msg) {
      return;
    }

    try {
      const messageData = this.serializer.deserialize(msg.content.toString());
      if (!isMessageData(messageData)) {
        throw new Error('Invalid message data format');
      }
      const message = Message.fromObject(messageData);

      this.batch.push({ message, msg });

      // Process batch if it's full or timeout
      if (this.batch.length >= this.batchSize) {
        await this.processBatch(options);
      } else if (!this.batchTimer) {
        this.batchTimer = setTimeout(() => {
          this.processBatch(options).catch(error => {
            this.logger.error('Error processing batch', error);
          });
        }, this.batchTimeout);
      }
    } catch (error) {
      this.logger.error('Failed to add message to batch', error as Error);

      if (!options.autoAck) {
        this.channel!.nack(msg, false, false);
      }
    }
  }

  private async processBatch(options: ConsumerOptions): Promise<void> {
    if (this.batch.length === 0) {
      return;
    }

    const currentBatch = [...this.batch];
    this.batch.length = 0;

    if (this.batchTimer) {
      clearTimeout(this.batchTimer);
      this.batchTimer = undefined;
    }

    try {
      // Process all messages in the batch
      const messages = currentBatch.map(item => item.message);

      // Create a batch message containing all messages
      const batchMessage = new Message(messages);
      const result = await this.handler.handle(batchMessage);

      if (result.success) {
        // Ack all messages
        if (!options.autoAck) {
          currentBatch.forEach(item => this.channel!.ack(item.msg));
        }

        this.emit('batchProcessed', messages, result);
      } else {
        // Nack all messages
        if (!options.autoAck) {
          currentBatch.forEach(item => this.channel!.nack(item.msg, false, result.shouldRetry));
        }

        this.emit('batchProcessingFailed', messages, result);
      }
    } catch (error) {
      this.logger.error('Error processing batch', error as Error);

      // Nack all messages
      if (!options.autoAck) {
        currentBatch.forEach(item => this.channel!.nack(item.msg, false, false));
      }

      this.emit(
        'batchError',
        currentBatch.map(item => item.message),
        error
      );
    }
  }
}

/**
 * Utility functions for creating consumers
 */
export const createConsumer = (
  connectionPool: ConnectionPool,
  handler: MessageHandler,
  serializer?: MessageSerializer,
  logger?: Logger
): Consumer => {
  return new Consumer(connectionPool, handler, serializer, logger);
};

export const createConcurrentConsumer = (
  connectionPool: ConnectionPool,
  handler: MessageHandler,
  concurrency = 5,
  serializer?: MessageSerializer,
  logger?: Logger
): ConcurrentConsumer => {
  return new ConcurrentConsumer(connectionPool, handler, concurrency, serializer, logger);
};

export const createBatchConsumer = (
  connectionPool: ConnectionPool,
  handler: MessageHandler,
  batchSize = 10,
  batchTimeout = 5000,
  serializer?: MessageSerializer,
  logger?: Logger
): BatchConsumer => {
  return new BatchConsumer(connectionPool, handler, batchSize, batchTimeout, serializer, logger);
};
